【CDK】StepFunctionsのMapステートを使って処理を並列実行させる

【CDK】StepFunctionsのMapステートを使って処理を並列実行させる

Clock Icon2023.10.23

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

先日、複数のデータに対して同じ処理を実行させたいという場面があり、Step FunctionsのMapステートを使って実装しました。とても便利だったので、復習を兼ねて記事に残しておこうと思います。

やりたいこと

下図のようなフローをCDKで作成します。

まずGetListで処理を実行させたいデータの一覧を取得します。続いてその一覧をMapステートに渡して、各データに対してSomeProcessを並列に実行させるようにします。

環境

本記事を執筆したときの環境は以下になります。

  • AWS CDK 2.84.0
  • Node.js v18.16.0

また、CDKのインストールやブートストラップ等は実施済みであるとします。もしまだ実施していないという場合は、公式サイトを参考にしてみてください。

AWS CDK の開始方法 - AWS Cloud Development Kit (AWS CDK) v2

やってみる

CDKプロジェクトの作成

プロジェクトフォルダを作成し、下記コマンドを実行します。ここでは言語としてTypeScriptを選択します。

cdk init --language typescript

プロジェクトフォルダ内に必要なファイルが作成されます。

Lambda関数用コードの作成

まずはGetListSomeProcessの2つのLambda関数用コードを作成します。

プロジェクトフォルダ直下にlambdaというフォルダを作成し、get-list.tssome-process.tsを作成します。

get-list.tsのコードは以下のようにしてみました。

interface Person {
  name: string
  age: number
  address: string
}
export const handler = async (): Promise<Person[]> => {
  try {
    console.log("start get-list.ts")

    // 並列に処理したい各データ
    const data = [
      {
        name: 'Tanaka',
        age: 20,
        address: 'Tokyo',
      },
      {
        name: 'Mizuno',
        age: 30,
        address: 'Osaka',
      },
      {
        name: 'Yamada',
        age: 25,
        address: 'Nagoya',
      },
      {
        name: 'Sato',
        age: 28,
        address: 'Fukuoka',
      },
    ]

    return data
  } catch (error) {
    throw error
  }
}

DBか何かから人物のデータ一覧を取得して、それぞれのデータに対して何か並列に処理を行うというイメージです。Mapステートに渡すために、データを配列形式で返します。

some-process.tsのコードは次のようにしてみました。

interface ProcessEvent {
  param: {
    name: string
    age: number
  }
}
export const handler = async (event: ProcessEvent): Promise<void> => {
  try {
    console.log(`start some-process.ts name: ${event.param.name} age: ${event.param.age}`)

    // 時間がかかる処理
    await sleep(5000)

    console.log(`end some-process.ts name: ${event.param.name} age: ${event.param.age}`)
  } catch (error) {
    throw error
  }
}

const sleep = (msec: number) =>
  new Promise((resolve) => setTimeout(resolve, msec))

このLambda関数は人物のデータを受け取って、何か時間のかかる処理を行うというイメージです。引数としてProcessEvent型の値を受け取るように定義しています。

Step Functionsの作成

続いてStep Functionsのリソースを作っていきます。

libフォルダ配下に自動的に作成されたtsファイルがあると思いますので、そのファイルを次のように編集します。

import * as cdk from 'aws-cdk-lib';
import { Duration } from 'aws-cdk-lib';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import { LambdaInvoke } from 'aws-cdk-lib/aws-stepfunctions-tasks';
import { LogLevel, StateMachine, Map } from 'aws-cdk-lib/aws-stepfunctions'
import { Construct } from 'constructs';
import { LogGroup, RetentionDays } from 'aws-cdk-lib/aws-logs';
// import * as sqs from 'aws-cdk-lib/aws-sqs';

export class CdkStepFunctionsMapStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // データ一覧を取得するLambda関数
    const getListLambda = new NodejsFunction(
      this,
      'GetListLambda',
      {
        entry: './lambda/get-list.ts',
        handler: 'handler',
        timeout: Duration.minutes(10),
        memorySize: 256,
        runtime: Runtime.NODEJS_18_X,
      }
    )
    // 各データに対して処理するLambda関数
    const someProcessLambda = new NodejsFunction(
      this,
      'SomeProcessLambda',
      {
        entry: './lambda/some-process.ts',
        handler: 'handler',
        timeout: Duration.minutes(10),
        memorySize: 256,
        runtime: Runtime.NODEJS_18_X,
      }
    )

    // データ一覧を取得するLambda関数を実行するプロセス
    const getListProcessState = new LambdaInvoke(
      this,
      'getListProcessState',
      {
        lambdaFunction: getListLambda,
      }
    )
    // 各データに対して処理するLambda関数を実行するプロセス
    const someProcessProcessState = new LambdaInvoke(
      this,
      'someProcessProcessState',
      {
        lambdaFunction: someProcessLambda,
      }
    )
    // 人物一覧を分割するためのMapステート
    const mapPersonsProcessState = new Map(this, 'mapPersons', {
      itemsPath: '$.Payload',
      maxConcurrency: 5,
      resultPath: '$.mapOutput',
      parameters: {
        param: {
          'name.$': '$$.Map.Item.Value.name',
          'age.$': '$$.Map.Item.Value.age',
        },
      },
    })
    // Map関数のイテレータを指定
    mapPersonsProcessState.iterator(someProcessProcessState)

    // リトライ設定
    getListProcessState.addRetry({
      interval: Duration.seconds(10),
      maxAttempts: 2,
      backoffRate: 2,
    })
    someProcessProcessState.addRetry({
      interval: Duration.seconds(10),
      maxAttempts: 2,
      backoffRate: 2,
    })

    // Step Functionsを作成
    const sampleStateMachine = new StateMachine(
      this,
      'sampleStateMachine',
      {
        stateMachineName: `sampleStateMachine`,
        definition:
          getListProcessState.next(mapPersonsProcessState),
        logs: {
          level: LogLevel.ALL,
          destination: new LogGroup(
            this,
            'SampleStateMachineLogGroup',
            {
              retention: RetentionDays.ONE_WEEK,
            }
          ),
        },
      }
    )
  }
}

まず、NodejsFunctionで作成したget-list.tssome-process.tsをLambda関数化します。そして、LambdaInvokeでStep FunctionsからLambda関数を呼び出すためのアクションを作成します。

MapステートのitemPathは、処理したい配列が格納されているJSONパスを指定します。今回の場合、getListからの出力は以下のようなJSONになります。

{
  "ExecutedVersion": "$LATEST",
  "Payload": [
    {
      "name": "Tanaka",
      "age": 20,
      "address": "Tokyo"
    },
    {
      "name": "Mizuno",
      "age": 30,
      "address": "Osaka"
    },
    {
      "name": "Yamada",
      "age": 25,
      "address": "Nagoya"
    },
    {
      "name": "Sato",
      "age": 28,
      "address": "Fukuoka"
    }
  ],
(以下略)

$はオブジェクトそのものを指すので、処理したい配列が格納されているパスは$.Payloadとなります。

また、someProcessは引数として以下のオブジェクト型を要求していました。

param: {
    name: string
    age: number
}

そこで、Mapステートのparametersを指定して、この型に合致する値が渡されるようにしています。$$.Map.Item.ValueにはMapステートで分割された配列の各要素、つまり

{
    "name": "Tanaka",
    "age": 20,
    "address": "Tokyo"
}

のような値が入っています。そこで、そのうちのnameageを抜き出して指定しています。

Mapステートの同時実行数を5に設定しているので、配列のうち5つずつ並列に処理されることになります。

そして、Mapステートで分割された各要素に対して実行させる処理として、someProcessを設定しています。

Step Functionsを作成する際には、まずgetListを呼び出し、その次にMapステートが実行されるように定義しています。MapステートのイテレータとしてsomeProcessを呼ぶように既に指定しているので、これだけの定義で大丈夫です。

動作確認

CDKをデプロイします。

cdk deploy

デプロイの途中でこのようなメッセージが表示された場合、yを入力します。

スタックのARNとTotal timeが表示されればデプロイ完了です。

AWSマネジメントコンソールでStep Functionsのページを開きます。sampleStateMachineが作成されています。

sampleStateMachineをクリックして、ページ下部の「実行を開始」をクリックします。

このようなウインドウが表示されますが、最初に実行されるgetListは特に入力はあってもなくても関係がないので、このままでOKです。

実行するとこのように進行状況が図で表示されます。

数秒待つと緑色に変わり、処理が成功します。

どのような処理が行われたかは「イベント」を見るとわかります。

上記のイベントのうち、MapStateEnteredイベントを開いてみると、以下のようにイベントが定義されています。getListで取得した人物のデータが配列で渡されています。

{
  "name": "mapPersons",
  "input": {
    "ExecutedVersion": "$LATEST",
    "Payload": [
      {
        "name": "Tanaka",
        "age": 20,
        "address": "Tokyo"
      },
      {
        "name": "Mizuno",
        "age": 30,
        "address": "Osaka"
      },
      {
        "name": "Yamada",
        "age": 25,
        "address": "Nagoya"
      },
      {
        "name": "Sato",
        "age": 28,
        "address": "Fukuoka"
      }
    ],
(以下略)

そして、someProcessProcessStateイベントを見てみると、配列の要素の一つがインプットになっていることがわかります。

開始から終了までの時間を見てみると、6秒ほどでした。someProcessでは5秒待つ処理を入れており、順次実行されるとすると20秒かかるはずなので、並列に実行されたことがわかります。

おわりに

同じ処理を複数のデータに対して並列に実行させたいというケースはよくあると思います。そういうときにStep Functionsを使って簡単に実装できるので便利だと思いました。

この記事がどなたかの参考になれば幸いです。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.